Implement named pipe support
authorpavelxdd <pavel.otchertsov@gmail.com>
Mon, 20 Aug 2018 21:46:51 +0000 (00:46 +0300)
committerpavelxdd <pavel.otchertsov@gmail.com>
Tue, 21 Aug 2018 00:25:39 +0000 (03:25 +0300)
16 files changed:
Debug/src/siri/net/subdir.mk
Release/src/siri/net/subdir.mk
include/siri/cfg/cfg.h
include/siri/net/clserver.h
include/siri/net/pipe.h [new file with mode: 0644]
siridb.conf
src/siri/cfg/cfg.c
src/siri/db/auth.c
src/siri/db/insert.c
src/siri/db/query.c
src/siri/net/bserver.c
src/siri/net/clserver.c
src/siri/net/pipe.c [new file with mode: 0644]
src/siri/net/pkg.c
src/siri/parser/listener.c
src/siri/siri.c

index 098b149f742cbe19af9c62fa5c11665520174081..c6b2b66641a24f1be015d7c69f4b7d6e0cfb66bc 100644 (file)
@@ -10,7 +10,8 @@ C_SRCS += \
 ../src/siri/net/promise.c \
 ../src/siri/net/promises.c \
 ../src/siri/net/protocol.c \
-../src/siri/net/socket.c
+../src/siri/net/socket.c \
+../src/siri/net/pipe.c
 
 OBJS += \
 ./src/siri/net/bserver.o \
@@ -19,7 +20,8 @@ OBJS += \
 ./src/siri/net/promise.o \
 ./src/siri/net/promises.o \
 ./src/siri/net/protocol.o \
-./src/siri/net/socket.o
+./src/siri/net/socket.o \
+./src/siri/net/pipe.o
 
 C_DEPS += \
 ./src/siri/net/bserver.d \
@@ -28,7 +30,8 @@ C_DEPS += \
 ./src/siri/net/promise.d \
 ./src/siri/net/promises.d \
 ./src/siri/net/protocol.d \
-./src/siri/net/socket.d
+./src/siri/net/socket.d \
+./src/siri/net/pipe.d
 
 
 # Each subdirectory must supply rules for building sources it contributes
@@ -38,5 +41,3 @@ src/siri/net/%.o: ../src/siri/net/%.c
        gcc -DDEBUG=1 -I../include -O0 -g3 -Wall -Wextra $(CPPFLAGS) $(CFLAGS) -c -fmessage-length=0 -MMD -MP -MF"$(@:%.o=%.d)" -MT"$(@)" -o "$@" "$<"
        @echo 'Finished building: $<'
        @echo ' '
-
-
index c5ef8a3b0608d61f104827d597c984b3338eaece..54657d92db556ac968375090084908638622e9a4 100644 (file)
@@ -10,7 +10,8 @@ C_SRCS += \
 ../src/siri/net/promise.c \
 ../src/siri/net/promises.c \
 ../src/siri/net/protocol.c \
-../src/siri/net/socket.c
+../src/siri/net/socket.c \
+../src/siri/net/pipe.c
 
 OBJS += \
 ./src/siri/net/bserver.o \
@@ -19,7 +20,8 @@ OBJS += \
 ./src/siri/net/promise.o \
 ./src/siri/net/promises.o \
 ./src/siri/net/protocol.o \
-./src/siri/net/socket.o
+./src/siri/net/socket.o \
+./src/siri/net/pipe.o
 
 C_DEPS += \
 ./src/siri/net/bserver.d \
@@ -28,7 +30,8 @@ C_DEPS += \
 ./src/siri/net/promise.d \
 ./src/siri/net/promises.d \
 ./src/siri/net/protocol.d \
-./src/siri/net/socket.d
+./src/siri/net/socket.d \
+./src/siri/net/pipe.d
 
 
 # Each subdirectory must supply rules for building sources it contributes
@@ -38,5 +41,3 @@ src/siri/net/%.o: ../src/siri/net/%.c
        gcc -I../include -O3 -Wall -Wextra $(CPPFLAGS) $(CFLAGS) -c -fmessage-length=0 -MMD -MP -MF"$(@:%.o=%.d)" -MT"$(@)" -o "$@" "$<"
        @echo 'Finished building: $<'
        @echo ' '
-
-
index 446520fc663e9bdd3bd2e5467415f189ab03719f..51b0d4b01895ae23422312180d607b6e87d5c366 100644 (file)
@@ -23,6 +23,8 @@ typedef struct siri_cfg_s
     uint8_t shard_compression;
     char server_address[SIRI_CFG_MAX_LEN_ADDRESS];
     char default_db_path[SIRI_PATH_MAX];
+    uint8_t pipe_support;
+    char pipe_client_name[SIRI_PATH_MAX];
 } siri_cfg_t;
 
 void siri_cfg_init(siri_t * siri);
index 4bf7b81d31f5e5185fdf080f87d57ca378dc7b56..12dd4f6d06ecc946693c34a04a6f042e858cd77f 100644 (file)
 #pragma once
 
 #include <uv.h>
+#include <siri/net/pipe.h>
+#include <siri/net/socket.h>
 #include <siri/siri.h>
 
 typedef struct siri_s siri_t;
 
+#define sirinet_client_incref(client)                                          \
+switch ((client)->type)                                                        \
+{                                                                              \
+case UV_TCP:                                                                   \
+    sirinet_socket_incref(client);                                             \
+    break;                                                                     \
+case UV_NAMED_PIPE:                                                            \
+    sirinet_pipe_incref(client);                                               \
+    break;                                                                     \
+default:                                                                       \
+    break;                                                                     \
+}
+
+#define sirinet_client_decref(client)                                          \
+switch ((client)->type)                                                        \
+{                                                                              \
+case UV_TCP:                                                                   \
+    sirinet_socket_decref(client);                                             \
+    break;                                                                     \
+case UV_NAMED_PIPE:                                                            \
+    sirinet_pipe_decref(client);                                               \
+    break;                                                                     \
+default:                                                                       \
+    uv_close((uv_handle_t *) (client), NULL);                                  \
+    break;                                                                     \
+}
+
+#define CLIENT_SIRIDB(client, siridb)                                          \
+siridb_t * siridb = NULL;                                                      \
+switch ((client)->type)                                                        \
+{                                                                              \
+case UV_TCP:                                                                   \
+    siridb = ((sirinet_socket_t *) (client)->data)->siridb;                    \
+    break;                                                                     \
+case UV_NAMED_PIPE:                                                            \
+    siridb = ((sirinet_pipe_t *) (client)->data)->siridb;                      \
+    break;                                                                     \
+default:                                                                       \
+    break;                                                                     \
+}
+
+#define CLIENT_USER(client, user)                                              \
+siridb_user_t * user = NULL;                                                   \
+switch ((client)->type)                                                        \
+{                                                                              \
+case UV_TCP:                                                                   \
+    user = (siridb_user_t *) ((sirinet_socket_t *) (client)->data)->origin;    \
+    break;                                                                     \
+case UV_NAMED_PIPE:                                                            \
+    user = (siridb_user_t *) ((sirinet_pipe_t *) (client)->data)->origin;      \
+    break;                                                                     \
+default:                                                                       \
+    break;                                                                     \
+}
+
 int sirinet_clserver_init(siri_t * siri);
 
 typedef ssize_t (*sirinet_clserver_getfile)(char ** buffer, siridb_t * siridb);
diff --git a/include/siri/net/pipe.h b/include/siri/net/pipe.h
new file mode 100644 (file)
index 0000000..8851c82
--- /dev/null
@@ -0,0 +1,57 @@
+#pragma once
+
+#include <uv.h>
+#include <siri/db/db.h>
+#include <siri/net/pkg.h>
+#include <xpath/xpath.h>
+
+#define PIPE_NAME_SZ SIRI_PATH_MAX
+#define RESET_BUF_SIZE 1048576  /*  1 MB        */
+
+typedef enum sirinet_pipe_tp
+{
+    PIPE_CLIENT,
+    PIPE_BACKEND
+} sirinet_pipe_tp_t;
+
+typedef struct siridb_s siridb_t;
+typedef struct siridb_user_s siridb_user_t;
+
+typedef void (* on_data_cb_t)(uv_stream_t * client, sirinet_pkg_t * pkg);
+typedef void (* on_free_cb_t)(uv_stream_t * client);
+
+typedef struct sirinet_pipe_s
+{
+    sirinet_pipe_tp_t tp;
+    uint32_t ref;
+    on_data_cb_t on_data;
+    on_free_cb_t on_free;
+    siridb_t * siridb;
+    void * origin;  /* can be a user, server or NULL */
+    char * buf;
+    size_t len;
+    size_t size;
+    uv_pipe_t pipe;
+} sirinet_pipe_t;
+
+uv_pipe_t * sirinet_pipe_new(
+        sirinet_pipe_tp_t tp,
+        on_data_cb_t cb_data,
+        on_free_cb_t cb_free);
+void sirinet_pipe_alloc_buffer(
+        uv_handle_t * handle,
+        size_t suggested_size,
+        uv_buf_t * buf);
+int sirinet_pipe_name(char * buffer, uv_stream_t * client);
+void sirinet_pipe_on_data(
+        uv_stream_t * client,
+        ssize_t nread,
+        const uv_buf_t * buf);
+void sirinet__pipe_free(uv_stream_t * client);
+
+#define sirinet_pipe_incref(client) \
+    ((sirinet_pipe_t *) client->data)->ref++
+
+#define sirinet_pipe_decref(client) \
+    if (!--((sirinet_pipe_t *) client->data)->ref) \
+        uv_close((uv_handle_t *) client, (uv_close_cb) sirinet__pipe_free)
index a2e899080168e22fc0e2858bd9c49a63e79f4d68..7e6c665fa9d61b1e6beebcde4f9c60547153b185 100644 (file)
@@ -60,7 +60,17 @@ heartbeat_interval = 30
 max_open_files = 32768
 
 #
-# Use shard compression for storing data points. 
+# Use shard compression for storing data points.
 # Set value 0 to disable shard compression.
 #
 enable_shard_compression = 1
+
+#
+# Enable named pipe support for client connections.
+#
+enable_pipe_support = 0
+
+#
+# SiriDB will bind the client named pipe in this location.
+#
+pipe_client_name = siridb_client.sock
index 9bacac0cec7d8fe6b77252a5971a39070f23de8e..832c5f6c20f5513ab20b0a24d0c26ebe091ffbe6 100644 (file)
@@ -29,7 +29,9 @@ static siri_cfg_t siri_cfg = {
         .ip_support=IP_SUPPORT_ALL,
         .shard_compression=0,
         .server_address="localhost",
-        .default_db_path="/var/lib/siridb/"
+        .default_db_path="/var/lib/siridb/",
+        .pipe_support=0,
+        .pipe_client_name="siridb_client.sock"
 };
 
 static void SIRI_CFG_read_uint(
@@ -47,10 +49,15 @@ static void SIRI_CFG_read_addr(
         cfgparser_t * cfgparser,
         const char * option_name,
         char ** dest);
+static void SIRI_CFG_read_pipe_name(
+        cfgparser_t * cfgparser,
+        const char * option_name,
+        char * dest);
 static void SIRI_CFG_read_default_db_path(cfgparser_t * cfgparser);
 static void SIRI_CFG_read_max_open_files(cfgparser_t * cfgparser);
 static void SIRI_CFG_read_ip_support(cfgparser_t * cfgparser);
 static void SIRI_CFG_read_shard_compression(cfgparser_t * cfgparser);
+static void SIRI_CFG_read_pipe_support(cfgparser_t * cfgparser);
 
 void siri_cfg_init(siri_t * siri)
 {
@@ -119,6 +126,16 @@ void siri_cfg_init(siri_t * siri)
             "bind_server_address",
             &siri_cfg.bind_backend_addr);
 
+    SIRI_CFG_read_pipe_support(cfgparser);
+
+    if (siri_cfg.pipe_support)
+    {
+        SIRI_CFG_read_pipe_name(
+                cfgparser,
+                "pipe_client_name",
+                &siri_cfg.pipe_client_name);
+    }
+
     cfgparser_free(cfgparser);
 }
 
@@ -273,6 +290,40 @@ static void SIRI_CFG_read_shard_compression(cfgparser_t * cfgparser)
 
 }
 
+static void SIRI_CFG_read_pipe_support(cfgparser_t * cfgparser)
+{
+    cfgparser_option_t * option;
+    cfgparser_return_t rc;
+    rc = cfgparser_get_option(
+                &option,
+                cfgparser,
+                "siridb",
+                "enable_pipe_support");
+    if (rc != CFGPARSER_SUCCESS)
+    {
+        log_debug(
+                "Missing '%s' in '%s': %s. "
+                "Disable pipe support",
+                "enable_pipe_support",
+                siri.args->config,
+                cfgparser_errmsg(rc));
+    }
+    else if (option->tp != CFGPARSER_TP_INTEGER || option->val->integer > 1)
+    {
+        log_warning(
+                "Error reading '%s' in '%s': %s. "
+                "Disable pipe support",
+                "enable_pipe_support",
+                siri.args->config,
+                "error: expecting 0 or 1");
+    }
+    else if (option->val->integer == 1)
+    {
+        siri_cfg.pipe_support = 1;
+    }
+
+}
+
 static void SIRI_CFG_read_addr(
         cfgparser_t * cfgparser,
         const char * option_name,
@@ -317,6 +368,62 @@ static void SIRI_CFG_read_addr(
     }
 }
 
+static void SIRI_CFG_read_pipe_name(
+        cfgparser_t * cfgparser,
+        const char * option_name,
+        char * dest)
+{
+    cfgparser_option_t * option;
+    cfgparser_return_t rc;
+    size_t len;
+    rc = cfgparser_get_option(
+                &option,
+                cfgparser,
+                "siridb",
+                option_name);
+    if (rc != CFGPARSER_SUCCESS)
+    {
+        log_warning(
+                "Error reading '%s' in '%s': %s. "
+                "Using default value: '%s'",
+                option_name,
+                siri.args->config,
+                cfgparser_errmsg(rc),
+                dest);
+    }
+    else if (option->tp != CFGPARSER_TP_STRING)
+    {
+        log_warning(
+                "Error reading '%s' in '%s': %s. "
+                "Using default value: '%s'",
+                option_name,
+                siri.args->config,
+                "error: expecting a string value",
+                dest);
+    }
+    else
+    {
+        *dest = 0;
+
+        /* keep space left for a terminator char */
+        strncpy(dest,
+                option->val->string,
+                SIRI_PATH_MAX - 1);
+
+        len = strlen(dest);
+
+        if (len == SIRI_PATH_MAX - 1)
+        {
+            log_warning(
+                    "Default '%s' path exceeds %d characters, please "
+                    "check your configuration file: %s",
+                    option_name,
+                    SIRI_PATH_MAX - 2,
+                    siri.args->config);
+        }
+    }
+}
+
 static void SIRI_CFG_read_default_db_path(cfgparser_t * cfgparser)
 {
     cfgparser_option_t * option;
index 377575b9e4e23ea0e4efc1b86492e3a073d59910..863243d10ad46d63c61ca04ec5b2a5d9c7a2f894 100644 (file)
@@ -58,8 +58,18 @@ cproto_server_t siridb_auth_user_request(
         return CPROTO_ERR_AUTH_CREDENTIALS;
     }
 
-    ((sirinet_socket_t *) client->data)->siridb = siridb;
-    ((sirinet_socket_t *) client->data)->origin = user;
+    switch (client->type)
+    {
+    case UV_TCP:
+        ((sirinet_socket_t *) client->data)->siridb = siridb;
+        ((sirinet_socket_t *) client->data)->origin = user;
+        break;
+    case UV_NAMED_PIPE:
+        ((sirinet_pipe_t *) client->data)->siridb = siridb;
+        ((sirinet_pipe_t *) client->data)->origin = user;
+        break;
+    }
+
     siridb_user_incref(user);
 
     return CPROTO_RES_AUTH_SUCCESS;
@@ -116,8 +126,17 @@ bproto_server_t siridb_auth_server_request(
         return BPROTO_AUTH_ERR_UNKNOWN_UUID;
     }
 
-    ((sirinet_socket_t *) client->data)->siridb = siridb;
-    ((sirinet_socket_t *) client->data)->origin = server;
+    switch (client->type)
+    {
+    case UV_TCP:
+        ((sirinet_socket_t *) client->data)->siridb = siridb;
+        ((sirinet_socket_t *) client->data)->origin = server;
+        break;
+    case UV_NAMED_PIPE:
+        ((sirinet_pipe_t *) client->data)->siridb = siridb;
+        ((sirinet_pipe_t *) client->data)->origin = server;
+        break;
+    }
 
     free(server->version);
     server->version = strdup((const char *) qp_version->via.raw);
@@ -127,4 +146,3 @@ bproto_server_t siridb_auth_server_request(
 
     return BPROTO_AUTH_SUCCESS;
 }
-
index 571831a80bb4c2c6a20f2f104988a9131fce89a2..714ee503231ddf8447257774bc17633601a408c5 100644 (file)
@@ -22,7 +22,7 @@
 #include <siri/err.h>
 #include <siri/net/promises.h>
 #include <siri/net/protocol.h>
-#include <siri/net/socket.h>
+#include <siri/net/clserver.h>
 #include <siri/siri.h>
 #include <stdio.h>
 #include <string.h>
@@ -280,7 +280,7 @@ int siridb_insert_points_to_pools(siridb_insert_t * insert, size_t npoints)
     insert->npoints= npoints;
 
     /* increment the client reference counter */
-    sirinet_socket_incref(insert->client);
+    sirinet_client_incref(insert->client);
 
     uv_async_init(siri.loop, handle, INSERT_points_to_pools);
     handle->data = (void *) insert;
@@ -341,7 +341,7 @@ int insert_init_backend_local(
     }
     qp_unpacker_init(&ilocal->unpacker, promise->pkg->data, promise->pkg->len);
 
-    sirinet_socket_incref(client);
+    sirinet_client_incref(client);
     promise->data = client;
 
     promise->cb = (sirinet_promise_cb) INSERT_local_promise_backend_cb;
@@ -376,8 +376,7 @@ static void INSERT_on_response(slist_t * promises, uv_async_t * handle)
         sirinet_pkg_t * pkg;
         sirinet_promise_t * promise;
         siridb_insert_t * insert = (siridb_insert_t *) handle->data;
-        siridb_t * siridb =
-                ((sirinet_socket_t *) insert->client->data)->siridb;
+        CLIENT_SIRIDB(insert->client, siridb)
 
         int n = 0;
         char msg[MAX_INSERT_MSG];
@@ -998,7 +997,7 @@ static void INSERT_local_promise_backend_cb(
     {
         sirinet_pkg_send(client, pkg);
     }
-    sirinet_socket_decref(client);
+    sirinet_client_decref(client);
     sirinet_promise_decref(promise);
 }
 
@@ -1085,7 +1084,8 @@ static int INSERT_init_local(
 static void INSERT_points_to_pools(uv_async_t * handle)
 {
     siridb_insert_t * insert = (siridb_insert_t *) handle->data;
-    siridb_t * siridb = ((sirinet_socket_t *) insert->client->data)->siridb;
+    CLIENT_SIRIDB(insert->client, siridb)
+
     uint16_t pool = siridb->server->pool;
     sirinet_pkg_t * pkg, * repl_pkg;
     sirinet_promises_t * promises = sirinet_promises_new(
@@ -1472,7 +1472,7 @@ static void INSERT_free(uv_handle_t * handle)
     siridb_insert_t * insert = (siridb_insert_t *) handle->data;
 
     /* decrement the client reference counter */
-    sirinet_socket_decref(insert->client);
+    sirinet_client_decref(insert->client);
 
     /* free insert */
     siridb_insert_free(insert);
@@ -1481,5 +1481,3 @@ static void INSERT_free(uv_handle_t * handle)
     free((uv_async_t *) handle);
 
 }
-
-
index 55fa4ae1fbd6b79f7044be55cd627a4bce13d58d..2dd700a6af02bb8fed5fb38e246d1e6c8bf97ddb 100644 (file)
@@ -23,7 +23,7 @@
 #include <siri/db/walker.h>
 #include <siri/net/clserver.h>
 #include <siri/net/pkg.h>
-#include <siri/net/socket.h>
+#include <siri/net/clserver.h>
 #include <siri/parser/listener.h>
 #include <siri/parser/queries.h>
 #include <siri/siri.h>
@@ -70,7 +70,6 @@ void siridb_query_run(
         float factor,
         int flags)
 {
-    siridb_t * siridb;
     uv_async_t * handle = (uv_async_t *) malloc(sizeof(uv_async_t));
     if (handle == NULL)
     {
@@ -104,7 +103,7 @@ void siridb_query_run(
     query->pid = pid;
 
     /* increment client reference counter */
-    sirinet_socket_incref(client);
+    sirinet_client_incref(client);
 
     query->client = client;
     query->flags = flags;
@@ -132,8 +131,9 @@ void siridb_query_run(
         log_debug("Parsing query (%d): %s", query->flags, query->q);
     }
 
+    CLIENT_SIRIDB(query->client, siridb)
+
     /* increment active tasks */
-    siridb = ((sirinet_socket_t *) query->client->data)->siridb;
     siridb_tasks_inc(siridb->tasks);
 
     /* send next call */
@@ -145,7 +145,7 @@ void siridb_query_run(
 void siridb_query_free(uv_handle_t * handle)
 {
     siridb_query_t * query = (siridb_query_t *) handle->data;
-    siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+    CLIENT_SIRIDB(query->client, siridb)
 
     /* decrement active tasks */
     siridb_tasks_dec(siridb->tasks);
@@ -174,7 +174,7 @@ void siridb_query_free(uv_handle_t * handle)
     }
 
     /* decrement client reference counter */
-    sirinet_socket_decref(query->client);
+    sirinet_client_decref(query->client);
 
     /* free query */
     free(query);
@@ -252,7 +252,7 @@ void siridb_query_forward(
         int flags)
 {
     siridb_query_t * query = (siridb_query_t *) handle->data;
-    siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+    CLIENT_SIRIDB(query->client, siridb)
 
     /*
      * the size is important here, we will use the alloc_size to guess the
@@ -534,7 +534,8 @@ static void QUERY_send_no_query(uv_async_t * handle)
 
 #ifndef DEBUG
     /* production version returns timestamp now */
-    siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+    CLIENT_SIRIDB(query->client, siridb)
+
     qp_add_raw(query->packer, (const unsigned char *) "calc", 4);
     uint64_t ts = siridb_time_now(siridb, query->start);
 
@@ -564,7 +565,8 @@ static void QUERY_parse(uv_async_t * handle)
 {
     int rc;
     siridb_query_t * query = (siridb_query_t *) handle->data;
-    siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+    CLIENT_SIRIDB(query->client, siridb)
+
     siridb_walker_t * walker = siridb_walker_new(
             siridb,
             siridb_time_now(siridb, query->start),
@@ -654,8 +656,10 @@ static int QUERY_to_packer(qp_packer_t * packer, siridb_query_t * query)
         char buffer[packer->alloc_size];
         size_t size = packer->alloc_size;
 
+        CLIENT_SIRIDB(query->client, siridb)
+
         rc = QUERY_rebuild(
-                ((sirinet_socket_t *) query->client->data)->siridb,
+                siridb,
                 query->pr->tree->children->node,
                 buffer,
                 &size,
index 7e2dd4ef53ea8a06e1338f4c01e56f100895f007..5fb6fb5c118a49b9299170df3ee331d7be305ac7 100644 (file)
@@ -764,4 +764,3 @@ static void on_disable_backup_mode(uv_stream_t * client, sirinet_pkg_t * pkg)
         sirinet_pkg_send(client, package);
     }
 }
-
index 8c4e75b8bca1046e94ba5926a0a9cbb8f1621cae..dcfc3b2e7efcc81c45324f4da7cdc8721f3964e6 100644 (file)
@@ -31,7 +31,6 @@
 #include <siri/net/clserver.h>
 #include <siri/net/promises.h>
 #include <siri/net/protocol.h>
-#include <siri/net/socket.h>
 #include <siri/siri.h>
 #include <siri/version.h>
 #include <stdbool.h>
@@ -49,11 +48,10 @@ const unsigned long int WARNING_PKG_SIZE = RESET_BUF_SIZE;
  */
 #define MAX_QUERY_PKG_SIZE 65535
 
-
 #define DEFAULT_BACKLOG 128
-#define CHECK_SIRIDB(ssocket)                                                  \
-sirinet_socket_t * ssocket = client->data;                                     \
-if (ssocket->siridb == NULL)                                                   \
+#define CHECK_SIRIDB(client, siridb)                                           \
+CLIENT_SIRIDB(client, siridb)                                                  \
+if ((siridb) == NULL)                                                          \
 {                                                                              \
     sirinet_pkg_t * package;                                                   \
     package = sirinet_pkg_new(pkg->pid, 0, CPROTO_ERR_NOT_AUTHENTICATED, NULL);\
@@ -69,10 +67,15 @@ static const int SERVER_RUNNING_REINDEXING =
 
 static uv_loop_t * loop = NULL;
 static struct sockaddr_storage client_addr;
-static uv_tcp_t client_server;
+static uv_tcp_t client_server_tcp;
+static uv_pipe_t client_server_pipe;
 
 static void on_data(uv_stream_t * client, sirinet_pkg_t * pkg);
-static void on_new_connection(uv_stream_t * server, int status);
+static void on_tcp_data(uv_stream_t * client, sirinet_pkg_t * pkg);
+static void on_tcp_new_connection(uv_stream_t * server, int status);
+static void on_pipe_data(uv_stream_t * client, sirinet_pkg_t * pkg);
+static void on_pipe_free(uv_stream_t * client);
+static void on_pipe_new_connection(uv_stream_t * server, int status);
 static void on_auth_request(uv_stream_t * client, sirinet_pkg_t * pkg);
 static void on_query(uv_stream_t * client, sirinet_pkg_t * pkg);
 static void on_insert(uv_stream_t * client, sirinet_pkg_t * pkg);
@@ -116,10 +119,11 @@ int sirinet_clserver_init(siri_t * siri)
     /* bind loop to the given loop */
     loop = siri->loop;
 
-    uv_tcp_init(loop, &client_server);
+    uv_tcp_init(loop, &client_server_tcp);
+    uv_pipe_init(loop, &client_server_pipe, 0);
 
     /* make sure data is set to NULL so we later on can check this value. */
-    client_server.data = NULL;
+    client_server_tcp.data = NULL;
 
     if (siri->cfg->bind_client_addr != NULL)
     {
@@ -155,40 +159,75 @@ int sirinet_clserver_init(siri_t * siri)
                 (struct sockaddr_in *) &client_addr);
     }
 
-    uv_tcp_bind(
-            &client_server,
+    rc = uv_tcp_bind(
+            &client_server_tcp,
             (const struct sockaddr *) &client_addr,
             (siri->cfg->ip_support == IP_SUPPORT_IPV6ONLY) ?
                     UV_TCP_IPV6ONLY : 0);
 
+    if (rc)
+    {
+        log_error("Error binding TCP client server: %s", uv_strerror(rc));
+        return 1;
+    }
+
     rc = uv_listen(
-            (uv_stream_t*) &client_server,
+            (uv_stream_t*) &client_server_tcp,
             DEFAULT_BACKLOG,
-            on_new_connection);
+            on_tcp_new_connection);
 
     if (rc)
     {
-        log_error("Error listening client server: %s", uv_strerror(rc));
+        log_error("Error listening TCP client server: %s", uv_strerror(rc));
         return 1;
     }
 
-    log_info("Start listening for client connections on port %d",
+    log_info("Start listening for TCP client connections on port %d",
             siri->cfg->listen_client_port);
 
+    if (siri->cfg->pipe_support)
+    {
+        char *pipe_name = siri->cfg->pipe_client_name;
+
+        rc = uv_pipe_bind(
+                &client_server_pipe,
+                pipe_name);
+
+        if (rc)
+        {
+            log_error("Error binding pipe client server: %s", uv_strerror(rc));
+            return 1;
+        }
+
+        rc = uv_listen(
+                (uv_stream_t*) &client_server_pipe,
+                DEFAULT_BACKLOG,
+                on_pipe_new_connection);
+
+        if (rc)
+        {
+            log_error("Error listening TCP client server: %s", uv_strerror(rc));
+            return 1;
+        }
+
+        log_info("Start listening for pipe client connections on '%s'",
+                pipe_name);
+    }
+
     return 0;
 }
 
-static void on_new_connection(uv_stream_t * server, int status)
+static void on_tcp_new_connection(uv_stream_t * server, int status)
 {
-    log_debug("Received a client connection request.");
+    log_debug("Received a TCP client connection request.");
 
     if (status < 0)
     {
-        log_error("Client connection error: %s", uv_strerror(status));
+        log_error("TCP client connection error: %s", uv_strerror(status));
         return;
     }
     uv_tcp_t * client =
-            sirinet_socket_new(SOCKET_CLIENT, (on_data_cb_t) &on_data);
+            sirinet_socket_new(SOCKET_CLIENT, (on_data_cb_t) &on_tcp_data);
 
     if (client != NULL)
     {
@@ -208,37 +247,41 @@ static void on_new_connection(uv_stream_t * server, int status)
     }
 }
 
-static void on_data(uv_stream_t * client, sirinet_pkg_t * pkg)
+static void on_pipe_new_connection(uv_stream_t * server, int status)
 {
-    if (Logger.level == LOGGER_DEBUG)
+    log_debug("Received a pipe client connection request.");
+
+    if (status < 0)
     {
-        char addr_port[ADDR_BUF_SZ];
-        if (sirinet_addr_and_port(addr_port, client) == 0)
-        {
-            log_debug(
-                    "Package received from client '%s' "
-                    "(pid: %" PRIu16 ", len: %" PRIu32 ", tp: %s)",
-                    addr_port,
-                    pkg->pid,
-                    pkg->len,
-                    sirinet_cproto_client_str(pkg->tp));
-        }
+        log_error("Pipe client connection error: %s", uv_strerror(status));
+        return;
     }
-    else if (pkg->len >= WARNING_PKG_SIZE)
+    uv_pipe_t * client =
+            sirinet_pipe_new(
+                    PIPE_CLIENT,
+                    (on_data_cb_t) &on_pipe_data,
+                    (on_free_cb_t) &on_pipe_free);
+
+    if (client != NULL)
     {
-        char addr_port[ADDR_BUF_SZ];
-        if (sirinet_addr_and_port(addr_port, client) == 0)
+        uv_pipe_init(loop, client, 0);
+
+        if (uv_accept(server, (uv_stream_t *) client) == 0)
         {
-            log_warning(
-                    "Got a large package from '%s' (pid: %d, len: %d, tp: %s)."
-                    " A package size smaller than 1MB is recommended!",
-                    addr_port,
-                    pkg->pid,
-                    pkg->len,
-                    sirinet_cproto_client_str(pkg->tp));
+            uv_read_start(
+                    (uv_stream_t *) client,
+                    sirinet_pipe_alloc_buffer,
+                    sirinet_pipe_on_data);
+        }
+        else
+        {
+            sirinet_pipe_decref(client);
         }
     }
+}
 
+static void on_data(uv_stream_t * client, sirinet_pkg_t * pkg)
+{
     /* in case the online flag is not set, we cannot perform any request */
     if (siri.status == SIRI_STATUS_RUNNING)
     {
@@ -284,14 +327,94 @@ static void on_data(uv_stream_t * client, sirinet_pkg_t * pkg)
     }
     else
     {
-        /* data->siridb can be NULL here, make sure we can handle this state */
+        CLIENT_SIRIDB(client, siridb)
+
+        /* siridb can be NULL here, make sure we can handle this state */
         CLSERVER_send_server_error(
-                ((sirinet_socket_t *) client->data)->siridb,
+                siridb,
                 client,
                 pkg);
     }
 }
 
+static void on_tcp_data(uv_stream_t * client, sirinet_pkg_t * pkg)
+{
+    if (Logger.level == LOGGER_DEBUG)
+    {
+        char addr_port[ADDR_BUF_SZ];
+        if (sirinet_addr_and_port(addr_port, client) == 0)
+        {
+            log_debug(
+                    "Package received from client '%s' "
+                    "(pid: %" PRIu16 ", len: %" PRIu32 ", tp: %s)",
+                    addr_port,
+                    pkg->pid,
+                    pkg->len,
+                    sirinet_cproto_client_str(pkg->tp));
+        }
+    }
+    else if (pkg->len >= WARNING_PKG_SIZE)
+    {
+        char addr_port[ADDR_BUF_SZ];
+        if (sirinet_addr_and_port(addr_port, client) == 0)
+        {
+            log_warning(
+                    "Got a large package from '%s' (pid: %d, len: %d, tp: %s)."
+                    " A package size smaller than 1MB is recommended!",
+                    addr_port,
+                    pkg->pid,
+                    pkg->len,
+                    sirinet_cproto_client_str(pkg->tp));
+        }
+    }
+
+    on_data(client, pkg);
+}
+
+static void on_pipe_data(uv_stream_t * client, sirinet_pkg_t * pkg)
+{
+    if (Logger.level == LOGGER_DEBUG)
+    {
+        char pipe_name[PIPE_NAME_SZ];
+        if (sirinet_pipe_name(pipe_name, client) == 0)
+        {
+            log_debug(
+                    "Package received from client '%s' "
+                    "(pid: %" PRIu16 ", len: %" PRIu32 ", tp: %s)",
+                    pipe_name,
+                    pkg->pid,
+                    pkg->len,
+                    sirinet_cproto_client_str(pkg->tp));
+        }
+    }
+    else if (pkg->len >= WARNING_PKG_SIZE)
+    {
+        char pipe_name[PIPE_NAME_SZ];
+        if (sirinet_pipe_name(pipe_name, client) == 0)
+        {
+            log_warning(
+                    "Got a large package from '%s' (pid: %d, len: %d, tp: %s)."
+                    " A package size smaller than 1MB is recommended!",
+                    pipe_name,
+                    pkg->pid,
+                    pkg->len,
+                    sirinet_cproto_client_str(pkg->tp));
+        }
+    }
+
+    on_data(client, pkg);
+}
+
+static void on_pipe_free(uv_stream_t * client)
+{
+    char pipe_name[PIPE_NAME_SZ];
+    if (sirinet_pipe_name(pipe_name, client) == 0)
+    {
+        uv_fs_t req;
+        uv_fs_unlink(loop, &req, pipe_name, NULL);
+    }
+}
+
 static void on_auth_request(uv_stream_t * client, sirinet_pkg_t * pkg)
 {
     cproto_server_t rc;
@@ -395,7 +518,7 @@ static void CLSERVER_send_pool_error(
 
 static void on_query(uv_stream_t * client, sirinet_pkg_t * pkg)
 {
-    CHECK_SIRIDB(ssocket)
+    CHECK_SIRIDB(client, siridb)
 
     if (pkg->len > MAX_QUERY_PKG_SIZE)
     {
@@ -436,12 +559,12 @@ static void on_query(uv_stream_t * client, sirinet_pkg_t * pkg)
 
         if (qp_time_precision.tp == QP_INT64 &&
                 (tp = (siridb_timep_t) qp_time_precision.via.int64) !=
-                ssocket->siridb->time->precision)
+                siridb->time->precision)
         {
             tp %= SIRIDB_TIME_END;
         }
         factor = (tp == SIRIDB_TIME_DEFAULT) ? 0.0 :
-                pow(1000.0, tp - ssocket->siridb->time->precision);
+                pow(1000.0, tp - siridb->time->precision);
 
         siridb_query_run(
                 pkg->pid,
@@ -459,12 +582,13 @@ static void on_query(uv_stream_t * client, sirinet_pkg_t * pkg)
 
 static void on_insert(uv_stream_t * client, sirinet_pkg_t * pkg)
 {
-    CHECK_SIRIDB(ssocket)
+    CHECK_SIRIDB(client, siridb)
+    CLIENT_USER(client, siridb_user)
 
     char err_msg[SIRIDB_MAX_SIZE_ERR_MSG];
 
     if (!siridb_user_check_access(
-            (siridb_user_t *) ssocket->origin,
+            siridb_user,
             SIRIDB_ACCESS_INSERT,
             err_msg))
     {
@@ -486,8 +610,6 @@ static void on_insert(uv_stream_t * client, sirinet_pkg_t * pkg)
         return;
     }
 
-    siridb_t * siridb = ssocket->siridb;
-
     /* only when when the flag is EXACTLY running or
      * running + re-indexing we can continue */
     if (    siridb->server->flags != SERVER_FLAG_RUNNING &&
@@ -661,9 +783,9 @@ static void on_reqfile(
         sirinet_pkg_t * pkg,
         sirinet_clserver_getfile getfile)
 {
-    CHECK_SIRIDB(ssocket)
+    CHECK_SIRIDB(client, siridb)
+    CLIENT_USER(client, siridb_user)
 
-    siridb_t * siridb = ssocket->siridb;
     sirinet_pkg_t * package = NULL;
     char err_msg[SIRIDB_MAX_SIZE_ERR_MSG];
 
@@ -681,7 +803,7 @@ static void on_reqfile(
                 err_msg);
     }
     else if (!siridb_user_check_access(
-            (siridb_user_t *) ssocket->origin,
+            siridb_user,
             SIRIDB_ACCESS_PROFILE_FULL,
             err_msg))
     {
@@ -720,9 +842,9 @@ static void on_reqfile(
  */
 static void on_register_server(uv_stream_t * client, sirinet_pkg_t * pkg)
 {
-    CHECK_SIRIDB(ssocket)
+    CHECK_SIRIDB(client, siridb)
+    CLIENT_USER(client, siridb_user)
 
-    siridb_t * siridb = ssocket->siridb;
     sirinet_pkg_t * package = NULL;
     siridb_server_t * new_server = NULL;
     char err_msg[SIRIDB_MAX_SIZE_ERR_MSG];
@@ -754,7 +876,7 @@ static void on_register_server(uv_stream_t * client, sirinet_pkg_t * pkg)
                 err_msg);
     }
     else if (!siridb_user_check_access(
-            (siridb_user_t *) ssocket->origin,
+            siridb_user,
             SIRIDB_ACCESS_PROFILE_FULL,
             err_msg))
     {
@@ -828,7 +950,7 @@ static void on_register_server(uv_stream_t * client, sirinet_pkg_t * pkg)
             if (servers != NULL && (package = sirinet_pkg_dup(pkg)) != NULL)
             {
                 /* make sure to decrement the client in the callback */
-                sirinet_socket_incref(client);
+                sirinet_client_incref(client);
 
                 siridb_servers_send_pkg(
                         servers,
@@ -988,9 +1110,8 @@ static void CLSERVER_on_register_server_response(
     }
 
     /* decref the client */
-    sirinet_socket_decref(server_reg->client);
+    sirinet_client_decref(server_reg->client);
 
     /* free server register object */
     free(server_reg);
 }
-
diff --git a/src/siri/net/pipe.c b/src/siri/net/pipe.c
new file mode 100644 (file)
index 0000000..fd8c7c7
--- /dev/null
@@ -0,0 +1,245 @@
+#include <assert.h>
+#include <logger/logger.h>
+#include <siri/admin/client.h>
+#include <siri/err.h>
+#include <siri/net/protocol.h>
+#include <siri/net/pipe.h>
+#include <siri/siri.h>
+#include <stdlib.h>
+#include <string.h>
+
+#define MAX_ALLOWED_PKG_SIZE 20971520      /* 20 MB  */
+
+#define QUIT_PIPE                     \
+    free(spipe->buf);                 \
+    spipe->buf = NULL;                \
+    spipe->len = 0;                   \
+    spipe->size = 0;                  \
+    spipe->on_data = NULL;            \
+    sirinet_pipe_decref(client);      \
+    return;
+
+/*
+ * This function can raise a SIGNAL.
+ */
+void sirinet_pipe_alloc_buffer(
+        uv_handle_t * handle,
+        size_t suggested_size,
+        uv_buf_t * buf)
+{
+    sirinet_pipe_t * spipe = (sirinet_pipe_t *) handle->data;
+
+    if (!spipe->len && spipe->size > RESET_BUF_SIZE)
+    {
+        free(spipe->buf);
+        spipe->buf = (char *) malloc(suggested_size);
+        if (spipe->buf == NULL)
+        {
+            ERR_ALLOC
+            buf->len = 0;
+            return;
+        }
+        spipe->size = suggested_size;
+        spipe->len = 0;
+    }
+    buf->base = spipe->buf + spipe->len;
+    buf->len = spipe->size - spipe->len;
+}
+
+/*
+ * Buffer should have a size of PIPE_NAME_SZ
+ *
+ * Return 0 if successful or -1 in case of an error.
+ */
+int sirinet_pipe_name(char * buffer, uv_stream_t * client)
+{
+    size_t len = PIPE_NAME_SZ - 1;
+
+    if (uv_pipe_getsockname(
+            (uv_pipe_t *) client,
+            buffer,
+            &len))
+    {
+        return -1;
+    }
+
+    buffer[len] = 0;
+    return 0;
+}
+
+/*
+ * This function can raise a SIGNAL.
+ */
+void sirinet_pipe_on_data(
+        uv_stream_t * client,
+        ssize_t nread,
+        const uv_buf_t * buf)
+{
+    sirinet_pipe_t * spipe = (sirinet_pipe_t *) client->data;
+    sirinet_pkg_t * pkg;
+    size_t total_sz;
+    uint8_t check;
+
+    /*
+     * spipe->on_data is NULL when 'sirinet_pipe_decref' is called from
+     * within this function. We should never call 'sirinet_pipe_decref' twice
+     * so the best thing is to log and and exit this function.
+     */
+    if (spipe->on_data == NULL)
+    {
+        char pipe_name[PIPE_NAME_SZ];
+        if (sirinet_pipe_name(pipe_name, client) == 0)
+        {
+            log_error(
+                    "Received data from '%s' but we ignore the data since the "
+                    "connection will be closed in a few seconds...",
+                pipe_name);
+        }
+        return;
+    }
+
+    if (nread < 0)
+    {
+        if (nread != UV_EOF)
+        {
+            log_error("Read error: %s", uv_err_name(nread));
+        }
+        QUIT_PIPE
+    }
+
+    spipe->len += nread;
+
+    if (spipe->len < sizeof(sirinet_pkg_t))
+    {
+        return;
+    }
+
+    pkg = (sirinet_pkg_t *) spipe->buf;
+    check = pkg->tp ^ 255;
+    if (    check != pkg->checkbit ||
+            (spipe->tp == PIPE_CLIENT && pkg->len > MAX_ALLOWED_PKG_SIZE))
+    {
+        char pipe_name[PIPE_NAME_SZ];
+        if (sirinet_pipe_name(pipe_name, client) == 0)
+        {
+            log_error(
+                "Got an illegal package or size too large from '%s', "
+                "closing connection "
+                "(pid: %" PRIu16 ", len: %" PRIu32 ", tp: %" PRIu8 ")",
+                pipe_name, pkg->pid, pkg->len, pkg->tp);
+        }
+        QUIT_PIPE
+    }
+
+    total_sz = sizeof(sirinet_pkg_t) + pkg->len;
+    if (spipe->len < total_sz)
+    {
+        if (spipe->size < total_sz)
+        {
+            char * tmp = realloc(spipe->buf, total_sz);
+            if (tmp == NULL)
+            {
+                log_critical(
+                    "Cannot allocate size for package "
+                    "(pid: %" PRIu16 ", len: %" PRIu32 ", tp: %" PRIu8 ")",
+                    pkg->pid, pkg->len, pkg->tp);
+                QUIT_PIPE
+            }
+            spipe->buf = tmp;
+            spipe->size = total_sz;
+        }
+        return;
+    }
+
+    /* call on-data function */
+    (*spipe->on_data)(client, pkg);
+
+    spipe->len -= total_sz;
+
+    if (spipe->len > 0)
+    {
+        /* move data and call sirinet_pipe_on_data() function again */
+        memmove(spipe->buf, spipe->buf + total_sz, spipe->len);
+        sirinet_pipe_on_data(client, 0, buf);
+    }
+}
+
+/*
+ * Returns NULL and raises a SIGNAL in case an error has occurred.
+ *
+ * Note: ((sirinet_pipe_t *) pipe->data)->ref is initially set to 1
+ */
+uv_pipe_t * sirinet_pipe_new(
+        sirinet_pipe_tp_t tp,
+        on_data_cb_t cb_data,
+        on_free_cb_t cb_free)
+{
+    sirinet_pipe_t * spipe =
+            (sirinet_pipe_t *) malloc(sizeof(sirinet_pipe_t));
+
+    if (spipe == NULL)
+    {
+        ERR_ALLOC
+        return NULL;
+    }
+
+    spipe->tp = tp;
+    spipe->on_data = cb_data;
+    spipe->on_free = cb_free;
+    spipe->buf = NULL;
+    spipe->len = 0;
+    spipe->size = -1; /* this will force allocating on first request */
+    spipe->origin = NULL;
+    spipe->siridb = NULL;
+    spipe->ref = 1;
+    spipe->pipe.data = spipe;
+
+    return &spipe->pipe;
+}
+
+/*
+ * Never use this function but call sirinet_pipe_decref.
+ * Destroy pipe. (parsing NULL is not allowed)
+ *
+ * We know three different pipe types:
+ *  - client: used for clients. a user object might be destroyed.
+ *  - back-end: used to connect to other servers. a server might be destroyed.
+ *  - server: user for severs connecting to here. a server might be destroyed.
+ *
+ *  In case a server is destroyed, remaining promises will be cancelled and
+ *  the call-back functions will be called.
+ */
+void sirinet__pipe_free(uv_stream_t * client)
+{
+    sirinet_pipe_t * spipe = client->data;
+
+#if DEBUG
+    log_debug("Free pipe type: %d", spipe->tp);
+#endif
+
+    switch (spipe->tp)
+    {
+    case PIPE_CLIENT:  /* listens to client connections  */
+        if (spipe->origin != NULL)
+        {
+            siridb_user_t * user = (siridb_user_t *) spipe->origin;
+            siridb_user_decref(user);
+        }
+        break;
+    case PIPE_BACKEND:  /* listens to server connections  */
+        if (spipe->origin != NULL)
+        {
+            siridb_server_t * server = (siridb_server_t *) spipe->origin;
+            siridb_server_decref(server);
+        }
+        break;
+    }
+
+    if (spipe->on_free != NULL)
+    {
+        spipe->on_free(client);
+    }
+
+    free(spipe->buf);
+    free(spipe);
+}
index 3dd006096a3c3264d7ef664af3ca9ebfab318f43..bb6d934813081d33a250630ea22e3ab0ffd9c9bd 100644 (file)
@@ -13,7 +13,7 @@
 #include <logger/logger.h>
 #include <siri/err.h>
 #include <siri/net/pkg.h>
-#include <siri/net/socket.h>
+#include <siri/net/clserver.h>
 #include <stddef.h>
 #include <stdlib.h>
 #include <string.h>
@@ -167,7 +167,7 @@ int sirinet_pkg_send(uv_stream_t * client, sirinet_pkg_t * pkg)
     }
 
     /* increment client reference counter */
-    sirinet_socket_incref(client);
+    sirinet_client_incref(client);
 
     data->client = client;
     data->pkg = pkg;
@@ -213,7 +213,7 @@ static void PKG_write_cb(uv_write_t * req, int status)
 
     pkg_send_t * data = (pkg_send_t *) req->data;
 
-    sirinet_socket_decref(data->client);
+    sirinet_client_decref(data->client);
 
     free(data->pkg);
     free(data);
index c24f8f027046d366c46ba8b9b583c541b6e58d1b..44f3b92cb799d4f54fdc4237548bfb58e67a7b23 100644 (file)
@@ -35,7 +35,7 @@
 #include <siri/help/help.h>
 #include <siri/net/promises.h>
 #include <siri/net/protocol.h>
-#include <siri/net/socket.h>
+#include <siri/net/clserver.h>
 #include <siri/parser/listener.h>
 #include <siri/parser/queries.h>
 #include <siri/siri.h>
@@ -381,10 +381,10 @@ else                                    \
 /*
  * Start SIRIPARSER_MASTER_CHECK_ACCESS
  */
-#define SIRIPARSER_MASTER_CHECK_ACCESS(ACCESS_BIT)                            \
+#define SIRIPARSER_MASTER_CHECK_ACCESS(user, ACCESS_BIT)                      \
 if (IS_MASTER &&                                                              \
     !siridb_user_check_access(                                                \
-        (siridb_user_t *) ((sirinet_socket_t *) query->client->data)->origin, \
+        user,                                                                 \
         ACCESS_BIT,                                                           \
         query->err_msg))                                                      \
 {                                                                             \
@@ -512,7 +512,7 @@ static void enter_access_expr(uv_async_t * handle)
 static void enter_alter_group(uv_async_t * handle)
 {
     siridb_query_t * query = (siridb_query_t *) handle->data;
-    siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+    CLIENT_SIRIDB(query->client, siridb)
     query_alter_t * q_alter = (query_alter_t *) query->data;
 
     MASTER_CHECK_ACCESSIBLE(siridb)
@@ -545,7 +545,7 @@ static void enter_alter_group(uv_async_t * handle)
 static void enter_alter_server(uv_async_t * handle)
 {
     siridb_query_t * query = (siridb_query_t *) handle->data;
-    siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+    CLIENT_SIRIDB(query->client, siridb)
     query_alter_t * q_alter = (query_alter_t *) query->data;
     siridb_server_t * server = siridb_server_from_node(
             siridb,
@@ -578,7 +578,8 @@ static void enter_alter_stmt(uv_async_t * handle)
 {
     siridb_query_t * query = (siridb_query_t *) handle->data;
 
-    SIRIPARSER_MASTER_CHECK_ACCESS(SIRIDB_ACCESS_ALTER)
+    CLIENT_USER(query->client, siridb_user)
+    SIRIPARSER_MASTER_CHECK_ACCESS(siridb_user, SIRIDB_ACCESS_ALTER)
 
 #if DEBUG
     assert (query->packer == NULL);
@@ -607,7 +608,7 @@ static void enter_alter_stmt(uv_async_t * handle)
 static void enter_alter_user(uv_async_t * handle)
 {
     siridb_query_t * query = (siridb_query_t *) handle->data;
-    siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+    CLIENT_SIRIDB(query->client, siridb)
 
     MASTER_CHECK_ACCESSIBLE(siridb)
 
@@ -641,7 +642,8 @@ static void enter_count_stmt(uv_async_t * handle)
 {
     siridb_query_t * query = (siridb_query_t *) handle->data;
 
-    SIRIPARSER_MASTER_CHECK_ACCESS(SIRIDB_ACCESS_COUNT)
+    CLIENT_USER(query->client, siridb_user)
+    SIRIPARSER_MASTER_CHECK_ACCESS(siridb_user, SIRIDB_ACCESS_COUNT)
 
 #if DEBUG
     assert (query->packer == NULL);
@@ -672,7 +674,8 @@ static void enter_create_stmt(uv_async_t * handle)
 {
     siridb_query_t * query = (siridb_query_t *) handle->data;
 
-    SIRIPARSER_MASTER_CHECK_ACCESS(SIRIDB_ACCESS_CREATE)
+    CLIENT_USER(query->client, siridb_user)
+    SIRIPARSER_MASTER_CHECK_ACCESS(siridb_user, SIRIDB_ACCESS_CREATE)
 
     SIRIPARSER_NEXT_NODE
 }
@@ -712,7 +715,8 @@ static void enter_drop_stmt(uv_async_t * handle)
     assert (query->packer == NULL);
 #endif
 
-    SIRIPARSER_MASTER_CHECK_ACCESS(SIRIDB_ACCESS_DROP)
+    CLIENT_USER(query->client, siridb_user)
+    SIRIPARSER_MASTER_CHECK_ACCESS(siridb_user, SIRIDB_ACCESS_DROP)
 
     query->packer = sirinet_packer_new(1024);
 
@@ -738,9 +742,9 @@ static void enter_drop_stmt(uv_async_t * handle)
 static void enter_grant_user(uv_async_t * handle)
 {
     siridb_query_t * query = (siridb_query_t *) handle->data;
-    siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
-
-    SIRIPARSER_MASTER_CHECK_ACCESS(SIRIDB_ACCESS_GRANT)
+    CLIENT_SIRIDB(query->client, siridb)
+    CLIENT_USER(query->client, siridb_user)
+    SIRIPARSER_MASTER_CHECK_ACCESS(siridb_user, SIRIDB_ACCESS_GRANT)
     MASTER_CHECK_ACCESSIBLE(siridb)
 
     cleri_node_t * user_node =
@@ -778,7 +782,7 @@ static void enter_grant_user(uv_async_t * handle)
 static void enter_group_match(uv_async_t * handle)
 {
     siridb_query_t * query = (siridb_query_t *) handle->data;
-    siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+    CLIENT_SIRIDB(query->client, siridb)
     cleri_node_t * node = query->nodes->node;
     query_wrapper_t * q_wrapper = (query_wrapper_t *) query->data;
 
@@ -870,7 +874,7 @@ static void enter_help(uv_async_t * handle)
 static void enter_limit_expr(uv_async_t * handle)
 {
     siridb_query_t * query = (siridb_query_t *) handle->data;
-    siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+    CLIENT_SIRIDB(query->client, siridb)
     query_list_t * qlist = (query_list_t *) query->data;
     int64_t limit = query->nodes->node->children->next->node->result;
 
@@ -896,7 +900,8 @@ static void enter_list_stmt(uv_async_t * handle)
 {
     siridb_query_t * query = (siridb_query_t *) handle->data;
 
-    SIRIPARSER_MASTER_CHECK_ACCESS(SIRIDB_ACCESS_LIST)
+    CLIENT_USER(query->client, siridb_user)
+    SIRIPARSER_MASTER_CHECK_ACCESS(siridb_user, SIRIDB_ACCESS_LIST)
 
 #if DEBUG
     assert (query->packer == NULL);
@@ -960,9 +965,9 @@ static void enter_merge_as(uv_async_t * handle)
 static void enter_revoke_user(uv_async_t * handle)
 {
     siridb_query_t * query = (siridb_query_t *) handle->data;
-    siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
-
-    SIRIPARSER_MASTER_CHECK_ACCESS(SIRIDB_ACCESS_REVOKE)
+    CLIENT_SIRIDB(query->client, siridb)
+    CLIENT_USER(query->client, siridb_user)
+    SIRIPARSER_MASTER_CHECK_ACCESS(siridb_user, SIRIDB_ACCESS_REVOKE)
     MASTER_CHECK_ACCESSIBLE(siridb)
 
     cleri_node_t * user_node =
@@ -1004,12 +1009,13 @@ static void enter_revoke_user(uv_async_t * handle)
 static void enter_select_stmt(uv_async_t * handle)
 {
     siridb_query_t * query = (siridb_query_t *) handle->data;
-    siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+    CLIENT_SIRIDB(query->client, siridb)
     query_select_t * q_select;
     cleri_children_t * child;
     int skip_get_points;
 
-    SIRIPARSER_MASTER_CHECK_ACCESS(SIRIDB_ACCESS_SELECT)
+    CLIENT_USER(query->client, siridb_user)
+    SIRIPARSER_MASTER_CHECK_ACCESS(siridb_user, SIRIDB_ACCESS_SELECT)
     MASTER_CHECK_ACCESSIBLE(siridb)
 
 #if DEBUG
@@ -1064,7 +1070,7 @@ static void enter_select_stmt(uv_async_t * handle)
 static void enter_set_expression(uv_async_t * handle)
 {
     siridb_query_t * query = (siridb_query_t *) handle->data;
-    siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+    CLIENT_SIRIDB(query->client, siridb)
     cleri_node_t * node = query->nodes->node->children->next->next->node;
     query_alter_t * q_alter = (query_alter_t *) query->data;
 
@@ -1100,7 +1106,7 @@ static void enter_set_ignore_threshold(uv_async_t * handle)
 static void enter_set_name(uv_async_t * handle)
 {
     siridb_query_t * query = (siridb_query_t *) handle->data;
-    siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+    CLIENT_SIRIDB(query->client, siridb)
     cleri_node_t * name_node =
                 query->nodes->node->children->next->next->node;
 
@@ -1167,7 +1173,7 @@ static void enter_series_name(uv_async_t * handle)
 {
     siridb_query_t * query = (siridb_query_t *) handle->data;
     cleri_node_t * node = query->nodes->node;
-    siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+    CLIENT_SIRIDB(query->client, siridb)
     query_wrapper_t * q_wrapper = (query_wrapper_t *) query->data;
     siridb_series_t * series = NULL;
     uint16_t pool;
@@ -1324,7 +1330,7 @@ static void enter_series_match(uv_async_t * handle)
 static void enter_series_all(uv_async_t * handle)
 {
     siridb_query_t * query = (siridb_query_t *) handle->data;
-    siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+    CLIENT_SIRIDB(query->client, siridb)
     siridb_series_t * series;
     query_wrapper_t * q_wrapper = (query_wrapper_t *) query->data;
 
@@ -1383,7 +1389,7 @@ static void enter_series_all(uv_async_t * handle)
 static void enter_series_re(uv_async_t * handle)
 {
     siridb_query_t * query = (siridb_query_t *) handle->data;
-    siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+    CLIENT_SIRIDB(query->client, siridb)
     cleri_node_t * node = query->nodes->node;
     query_wrapper_t * q_wrapper = (query_wrapper_t *) query->data;
 
@@ -1561,9 +1567,9 @@ static void exit_after_expr(uv_async_t * handle)
 static void exit_alter_group(uv_async_t * handle)
 {
     siridb_query_t * query = (siridb_query_t *) handle->data;
+    CLIENT_SIRIDB(query->client, siridb)
 
-    if (siridb_groups_save(
-            ((sirinet_socket_t *) query->client->data)->siridb->groups))
+    if (siridb_groups_save(siridb->groups))
     {
         FILE_ERR_RET
     }
@@ -1590,8 +1596,9 @@ static void exit_alter_group(uv_async_t * handle)
 static void exit_alter_user(uv_async_t * handle)
 {
     siridb_query_t * query = (siridb_query_t *) handle->data;
+    CLIENT_SIRIDB(query->client, siridb)
 
-    if (siridb_users_save(((sirinet_socket_t *) query->client->data)->siridb))
+    if (siridb_users_save(siridb))
     {
         FILE_ERR_RET
     }
@@ -1687,7 +1694,7 @@ static void exit_calc_stmt(uv_async_t * handle)
 static void exit_count_groups(uv_async_t * handle)
 {
     siridb_query_t * query = (siridb_query_t *) handle->data;
-    siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+    CLIENT_SIRIDB(query->client, siridb)
     query_count_t * q_count = (query_count_t *) query->data;
 
     if (q_count->where_expr == NULL || !cexpr_contains(
@@ -1720,7 +1727,7 @@ static void exit_count_groups(uv_async_t * handle)
 static void exit_count_pools(uv_async_t * handle)
 {
     siridb_query_t * query = (siridb_query_t *) handle->data;
-    siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+    CLIENT_SIRIDB(query->client, siridb)
     query_count_t * q_count = (query_count_t *) query->data;
     siridb_pool_t * pool = siridb->pools->pool + siridb->server->pool;
 
@@ -1765,7 +1772,7 @@ static void exit_count_pools(uv_async_t * handle)
 static void exit_count_series(uv_async_t * handle)
 {
     siridb_query_t * query = (siridb_query_t *) handle->data;
-    siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+    CLIENT_SIRIDB(query->client, siridb)
     query_count_t * q_count = (query_count_t *) query->data;
 
     MASTER_CHECK_ONLINE(siridb)
@@ -1829,7 +1836,7 @@ static void exit_count_series(uv_async_t * handle)
 static void exit_count_series_length(uv_async_t * handle)
 {
     siridb_query_t * query = (siridb_query_t *) handle->data;
-    siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+    CLIENT_SIRIDB(query->client, siridb)
     query_count_t * q_count = (query_count_t *) query->data;
 
     MASTER_CHECK_ACCESSIBLE(siridb)
@@ -1916,7 +1923,7 @@ static void exit_count_series_length(uv_async_t * handle)
 static void exit_count_servers(uv_async_t * handle)
 {
     siridb_query_t * query = (siridb_query_t *) handle->data;
-    siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+    CLIENT_SIRIDB(query->client, siridb)
     query_count_t * q_count = (query_count_t *) query->data;
     cexpr_t * where_expr = q_count->where_expr;
     cexpr_cb_t cb = (cexpr_cb_t) siridb_server_cexpr_cb;
@@ -1972,7 +1979,7 @@ static void exit_count_servers(uv_async_t * handle)
 static void exit_count_servers_received(uv_async_t * handle)
 {
     siridb_query_t * query = (siridb_query_t *) handle->data;
-    siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+    CLIENT_SIRIDB(query->client, siridb)
     query_count_t * q_count = (query_count_t *) query->data;
     cexpr_t * where_expr = q_count->where_expr;
     cexpr_cb_t cb = (cexpr_cb_t) siridb_server_cexpr_cb;
@@ -2006,7 +2013,7 @@ static void exit_count_servers_received(uv_async_t * handle)
 static void exit_count_servers_selected(uv_async_t * handle)
 {
     siridb_query_t * query = (siridb_query_t *) handle->data;
-    siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+    CLIENT_SIRIDB(query->client, siridb)
     query_count_t * q_count = (query_count_t *) query->data;
     cexpr_t * where_expr = q_count->where_expr;
     cexpr_cb_t cb = (cexpr_cb_t) siridb_server_cexpr_cb;
@@ -2040,7 +2047,7 @@ static void exit_count_servers_selected(uv_async_t * handle)
 static void exit_count_shards(uv_async_t * handle)
 {
     siridb_query_t * query = (siridb_query_t *) handle->data;
-    siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+    CLIENT_SIRIDB(query->client, siridb)
     query_count_t * q_count = (query_count_t *) query->data;
 
     qp_add_raw(query->packer, (const unsigned char *) "shards", 6);
@@ -2111,7 +2118,7 @@ static void exit_count_shards(uv_async_t * handle)
 static void exit_count_shards_size(uv_async_t * handle)
 {
     siridb_query_t * query = (siridb_query_t *) handle->data;
-    siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+    CLIENT_SIRIDB(query->client, siridb)
     query_count_t * q_count = (query_count_t *) query->data;
     uint64_t duration;
     size_t i;
@@ -2174,7 +2181,7 @@ static void exit_count_shards_size(uv_async_t * handle)
 static void exit_count_users(uv_async_t * handle)
 {
     siridb_query_t * query = (siridb_query_t *) handle->data;
-    siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+    CLIENT_SIRIDB(query->client, siridb)
     llist_node_t * node = siridb->users->first;
     cexpr_t * where_expr = ((query_count_t *) query->data)->where_expr;
     cexpr_cb_t cb = (cexpr_cb_t) siridb_user_cexpr_cb;
@@ -2199,7 +2206,7 @@ static void exit_count_users(uv_async_t * handle)
 static void exit_create_group(uv_async_t * handle)
 {
     siridb_query_t * query = (siridb_query_t *) handle->data;
-    siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+    CLIENT_SIRIDB(query->client, siridb)
     cleri_node_t * name_nd =
             query->nodes->node->children->next->node;
 
@@ -2263,7 +2270,7 @@ static void exit_create_group(uv_async_t * handle)
 static void exit_create_user(uv_async_t * handle)
 {
     siridb_query_t * query = (siridb_query_t *) handle->data;
-    siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+    CLIENT_SIRIDB(query->client, siridb)
     siridb_user_t * user = ((query_alter_t *) query->data)->via.user;
     cleri_node_t * user_node =
             query->nodes->node->children->next->node;
@@ -2331,7 +2338,7 @@ static void exit_create_user(uv_async_t * handle)
 static void exit_drop_group(uv_async_t * handle)
 {
     siridb_query_t * query = (siridb_query_t *) handle->data;
-    siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+    CLIENT_SIRIDB(query->client, siridb)
 
     MASTER_CHECK_ACCESSIBLE(siridb)
 
@@ -2370,7 +2377,7 @@ static void exit_drop_group(uv_async_t * handle)
 static void exit_drop_series(uv_async_t * handle)
 {
     siridb_query_t * query = (siridb_query_t *) handle->data;
-    siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+    CLIENT_SIRIDB(query->client, siridb)
     query_drop_t * q_drop = (query_drop_t *) query->data;
 
     MASTER_CHECK_ACCESSIBLE(siridb)
@@ -2481,7 +2488,7 @@ static void exit_drop_series(uv_async_t * handle)
 static void exit_drop_server(uv_async_t * handle)
 {
     siridb_query_t * query = (siridb_query_t *) handle->data;
-    siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+    CLIENT_SIRIDB(query->client, siridb)
     siridb_server_t * server = siridb_server_from_node(
             siridb,
             query->nodes->node->children->next->node->children->node,
@@ -2550,7 +2557,7 @@ static void exit_drop_server(uv_async_t * handle)
 static void exit_drop_shards(uv_async_t * handle)
 {
     siridb_query_t * query = (siridb_query_t *) handle->data;
-    siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+    CLIENT_SIRIDB(query->client, siridb)
     query_drop_t * q_drop = (query_drop_t *) query->data;
 
     MASTER_CHECK_ACCESSIBLE(siridb)
@@ -2651,7 +2658,7 @@ static void exit_drop_shards(uv_async_t * handle)
 static void exit_drop_user(uv_async_t * handle)
 {
     siridb_query_t * query = (siridb_query_t *) handle->data;
-    siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+    CLIENT_SIRIDB(query->client, siridb)
 
     MASTER_CHECK_ACCESSIBLE(siridb)
 
@@ -2662,7 +2669,7 @@ static void exit_drop_user(uv_async_t * handle)
     strx_extract_string(username, user_node->str, user_node->len);
 
     if (siridb_users_drop_user(
-            ((sirinet_socket_t *) query->client->data)->siridb,
+            siridb,
             username,
             query->err_msg))
     {
@@ -2692,8 +2699,9 @@ static void exit_drop_user(uv_async_t * handle)
 static void exit_grant_user(uv_async_t * handle)
 {
     siridb_query_t * query = (siridb_query_t *) handle->data;
+    CLIENT_SIRIDB(query->client, siridb)
 
-    if (siridb_users_save(((sirinet_socket_t *) query->client->data)->siridb))
+    if (siridb_users_save(siridb))
     {
         sprintf(query->err_msg, "Could not write users to file!");
         log_critical(query->err_msg);
@@ -2773,7 +2781,7 @@ static void exit_help_xxx(uv_async_t * handle)
 static void exit_list_groups(uv_async_t * handle)
 {
     siridb_query_t * query = (siridb_query_t *) handle->data;
-    siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+    CLIENT_SIRIDB(query->client, siridb)
     query_list_t * q_list = (query_list_t *) query->data;
 
     int is_local = (q_list->props == NULL);
@@ -2830,7 +2838,7 @@ static void exit_list_groups(uv_async_t * handle)
 static void exit_list_pools(uv_async_t * handle)
 {
     siridb_query_t * query = (siridb_query_t *) handle->data;
-    siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+    CLIENT_SIRIDB(query->client, siridb)
     query_list_t * q_list = (query_list_t *) query->data;
     siridb_pool_t * pool = siridb->pools->pool + siridb->server->pool;
     siridb_pool_walker_t wpool = {
@@ -2912,7 +2920,7 @@ static void exit_list_series(uv_async_t * handle)
 {
     siridb_query_t * query = (siridb_query_t *) handle->data;
     query_list_t * q_list = (query_list_t *) query->data;
-    siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+    CLIENT_SIRIDB(query->client, siridb)
 
     if (q_list->props == NULL)
     {
@@ -2965,7 +2973,8 @@ static void exit_list_series(uv_async_t * handle)
 static void exit_list_servers(uv_async_t * handle)
 {
     siridb_query_t * query = (siridb_query_t *) handle->data;
-    siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+    CLIENT_SIRIDB(query->client, siridb)
+
     query_list_t * q_list = (query_list_t *) query->data;
     cexpr_t * where_expr = q_list->where_expr;
     int is_local = IS_MASTER;
@@ -3049,7 +3058,8 @@ static void exit_list_servers(uv_async_t * handle)
 static void exit_list_shards(uv_async_t * handle)
 {
     siridb_query_t * query = (siridb_query_t *) handle->data;
-    siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+    CLIENT_SIRIDB(query->client, siridb)
+
     query_list_t * q_list = (query_list_t *) query->data;
     uint_fast16_t prop;
     uint64_t duration;
@@ -3183,8 +3193,9 @@ static void exit_list_shards(uv_async_t * handle)
 static void exit_list_users(uv_async_t * handle)
 {
     siridb_query_t * query = (siridb_query_t *) handle->data;
-    llist_node_t * node =
-            ((sirinet_socket_t *) query->client->data)->siridb->users->first;
+    CLIENT_SIRIDB(query->client, siridb)
+
+    llist_node_t * node = siridb->users->first;
     slist_t * props = ((query_list_t *) query->data)->props;
     cexpr_cb_t cb = (cexpr_cb_t) siridb_user_cexpr_cb;
     query_list_t * q_list = (query_list_t *) query->data;
@@ -3242,8 +3253,9 @@ static void exit_list_users(uv_async_t * handle)
 static void exit_revoke_user(uv_async_t * handle)
 {
     siridb_query_t * query = (siridb_query_t *) handle->data;
+    CLIENT_SIRIDB(query->client, siridb)
 
-    if (siridb_users_save(((sirinet_socket_t *) query->client->data)->siridb))
+    if (siridb_users_save(siridb))
     {
         sprintf(query->err_msg, "Could not write users to file!");
         log_critical(query->err_msg);
@@ -3519,9 +3531,9 @@ static void exit_select_stmt(uv_async_t * handle)
 static void exit_set_address(uv_async_t * handle)
 {
     siridb_query_t * query = (siridb_query_t *) handle->data;
-    siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
     siridb_server_t * server = ((query_alter_t *) query->data)->via.server;
     cleri_node_t * node = query->nodes->node->children->next->next->node;
+    CLIENT_SIRIDB(query->client, siridb)
 
     if (siridb->server == server || server->socket != NULL)
     {
@@ -3563,7 +3575,7 @@ static void exit_set_address(uv_async_t * handle)
 static void exit_set_backup_mode(uv_async_t * handle)
 {
     siridb_query_t * query = (siridb_query_t *) handle->data;
-    siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+    CLIENT_SIRIDB(query->client, siridb)
 
 #if DEBUG
     assert (query->data != NULL);
@@ -3664,7 +3676,7 @@ static void exit_set_backup_mode(uv_async_t * handle)
 static void exit_set_drop_threshold(uv_async_t * handle)
 {
     siridb_query_t * query = (siridb_query_t *) handle->data;
-    siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+    CLIENT_SIRIDB(query->client, siridb)
 
     MASTER_CHECK_ACCESSIBLE(siridb)
 
@@ -3725,7 +3737,7 @@ static void exit_set_drop_threshold(uv_async_t * handle)
 static void exit_set_list_limit(uv_async_t * handle)
 {
     siridb_query_t * query = (siridb_query_t *) handle->data;
-    siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+    CLIENT_SIRIDB(query->client, siridb)
 
     MASTER_CHECK_ACCESSIBLE(siridb)
     MASTER_CHECK_VERSION(siridb, "2.0.17")
@@ -3788,8 +3800,8 @@ static void exit_set_list_limit(uv_async_t * handle)
 static void exit_set_log_level(uv_async_t * handle)
 {
     siridb_query_t * query = (siridb_query_t *) handle->data;
-    siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
     query_alter_t * q_alter = (query_alter_t *) query->data;
+    CLIENT_SIRIDB(query->client, siridb)
 
 #if DEBUG
     assert (query->data != NULL);
@@ -3931,9 +3943,9 @@ static void exit_set_log_level(uv_async_t * handle)
 static void exit_set_port(uv_async_t * handle)
 {
     siridb_query_t * query = (siridb_query_t *) handle->data;
-    siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
     siridb_server_t * server = ((query_alter_t *) query->data)->via.server;
     cleri_node_t * node = query->nodes->node->children->next->next->node;
+    CLIENT_SIRIDB(query->client, siridb)
 
     if (siridb->server == server || server->socket != NULL)
     {
@@ -3984,7 +3996,7 @@ static void exit_set_port(uv_async_t * handle)
 static void exit_set_select_points_limit(uv_async_t * handle)
 {
     siridb_query_t * query = (siridb_query_t *) handle->data;
-    siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+    CLIENT_SIRIDB(query->client, siridb)
 
     MASTER_CHECK_ACCESSIBLE(siridb)
     MASTER_CHECK_VERSION(siridb, "2.0.17")
@@ -4047,8 +4059,8 @@ static void exit_set_select_points_limit(uv_async_t * handle)
 static void exit_set_timezone(uv_async_t * handle)
 {
     siridb_query_t * query = (siridb_query_t *) handle->data;
-    siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
     cleri_node_t * node = query->nodes->node->children->next->next->node;
+    CLIENT_SIRIDB(query->client, siridb)
 
     MASTER_CHECK_ACCESSIBLE(siridb)
 
@@ -4112,8 +4124,9 @@ static void exit_set_timezone(uv_async_t * handle)
 static void exit_show_stmt(uv_async_t * handle)
 {
     siridb_query_t * query = (siridb_query_t *) handle->data;
-
-    SIRIPARSER_MASTER_CHECK_ACCESS(SIRIDB_ACCESS_SHOW)
+    CLIENT_SIRIDB(query->client, siridb)
+    CLIENT_USER(query->client, siridb_user)
+    SIRIPARSER_MASTER_CHECK_ACCESS(siridb_user, SIRIDB_ACCESS_SHOW)
 
     cleri_children_t * children =
             query->nodes->node->children->next->node->children;
@@ -4134,7 +4147,7 @@ static void exit_show_stmt(uv_async_t * handle)
     qp_add_raw(query->packer, (const unsigned char *) "data", 4);
     qp_add_type(query->packer, QP_ARRAY_OPEN);
 
-    siridb_user_t * user = ((sirinet_socket_t *) query->client->data)->origin;
+    CLIENT_USER(query->client, user)
     who_am_i = user->name;
 
     if (children->node == NULL)
@@ -4148,8 +4161,7 @@ static void exit_show_stmt(uv_async_t * handle)
             {
                 continue;
             }
-            prop_cb(((sirinet_socket_t *) query->client->data)->siridb,
-                    query->packer, 1);
+            prop_cb(siridb, query->packer, 1);
         }
     }
     else
@@ -4163,8 +4175,7 @@ static void exit_show_stmt(uv_async_t * handle)
 #if DEBUG
             assert (prop_cb != NULL);  /* all props are implemented */
 #endif
-            prop_cb(((sirinet_socket_t *) query->client->data)->siridb,
-                    query->packer, 1);
+            prop_cb(siridb, query->packer, 1);
 
             if (children->next == NULL)
             {
@@ -4184,9 +4195,10 @@ static void exit_show_stmt(uv_async_t * handle)
 static void exit_timeit_stmt(uv_async_t * handle)
 {
     siridb_query_t * query = (siridb_query_t *) handle->data;
+    CLIENT_SIRIDB(query->client, siridb)
+
     struct timespec end;
-    char * name =
-            ((sirinet_socket_t *) query->client->data)->siridb->server->name;
+    char * name = siridb->server->name;
     clock_gettime(CLOCK_REALTIME, &end);
 
     qp_add_type(query->timeit, QP_MAP2);
@@ -4328,8 +4340,8 @@ static void async_count_series_length(uv_async_t * handle)
 static void async_drop_series(uv_async_t * handle)
 {
     siridb_query_t * query = (siridb_query_t *) handle->data;
-    siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
     query_drop_t * q_drop = (query_drop_t *) query->data;
+    CLIENT_SIRIDB(query->client, siridb)
 
     siridb_series_t * series;
     uint8_t async_more = 0;
@@ -4386,6 +4398,7 @@ static void async_drop_shards(uv_async_t * handle)
 {
     siridb_query_t * query = (siridb_query_t *) handle->data;
     query_drop_t * q_drop = (query_drop_t *) query->data;
+    CLIENT_SIRIDB(query->client, siridb)
 
     if (q_drop->shards_list->len)
     {
@@ -4394,7 +4407,7 @@ static void async_drop_shards(uv_async_t * handle)
 
         siridb_shard_drop(
                 shard,
-                ((sirinet_socket_t *) query->client->data)->siridb);
+                siridb);
         siridb_shard_decref(shard);
     }
 
@@ -4585,7 +4598,7 @@ static void async_no_points_aggregate(uv_async_t * handle)
 {
     siridb_query_t * query = (siridb_query_t *) handle->data;
     query_select_t * q_select = (query_select_t *) query->data;
-    siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+    CLIENT_SIRIDB(query->client, siridb)
     uint8_t async_more = 0;
     siridb_series_t * series;
     siridb_points_t * points;
@@ -4725,7 +4738,7 @@ static void async_select_aggregate(uv_async_t * handle)
 {
     siridb_query_t * query = (siridb_query_t *) handle->data;
     query_select_t * q_select = (query_select_t *) query->data;
-    siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+    CLIENT_SIRIDB(query->client, siridb)
     uint8_t async_more = 0;
     siridb_series_t * series;
     siridb_points_t * points;
@@ -5274,7 +5287,7 @@ static void on_groups_response(slist_t * promises, uv_async_t * handle)
     sirinet_promise_t * promise;
     qp_unpacker_t unpacker;
     siridb_query_t * query = (siridb_query_t *) handle->data;
-    siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+    CLIENT_SIRIDB(query->client, siridb)
     siridb_group_t * group;
     qp_obj_t qp_name;
     qp_obj_t qp_series;
@@ -5418,7 +5431,7 @@ static void on_select_response(slist_t * promises, uv_async_t * handle)
     sirinet_promise_t * promise;
     qp_unpacker_t unpacker;
     siridb_query_t * query = (siridb_query_t *) handle->data;
-    siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+    CLIENT_SIRIDB(query->client, siridb)
     size_t err_count = 0;
     query_select_t * q_select = (query_select_t *) query->data;
     qp_obj_t qp_name;
@@ -5647,7 +5660,7 @@ static void master_select_work(uv_work_t * work)
     uv_async_t * handle = (uv_async_t *) work->data;
     siridb_query_t * query = (siridb_query_t *) handle->data;
     query_select_t * q_select = (query_select_t *) query->data;
-    siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
+    CLIENT_SIRIDB(query->client, siridb)
     siridb->selected_points += q_select->n;
     int rc = ct_items(
             q_select->result,
@@ -6027,8 +6040,8 @@ static int values_count_groups(siridb_group_t * group, uv_async_t * handle)
 static void finish_list_groups(uv_async_t * handle)
 {
     siridb_query_t * query = (siridb_query_t *) handle->data;
-    siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
     query_list_t * q_list = (query_list_t *) query->data;
+    CLIENT_SIRIDB(query->client, siridb)
 
     if (q_list->props == NULL)
     {
@@ -6060,8 +6073,8 @@ static void finish_list_groups(uv_async_t * handle)
 static void finish_count_groups(uv_async_t * handle)
 {
     siridb_query_t * query = (siridb_query_t *) handle->data;
-    siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb;
     query_count_t * q_count = (query_count_t *) query->data;
+    CLIENT_SIRIDB(query->client, siridb)
 
     /* Note: ct_values(..values_count_groups..) can only result in a positive
      *       value.
index 932eee26245284a5eb1cb5c1e7abaddfa52e6fa7..389a32aa805756db46fb7439e7801830ab557c95 100644 (file)
@@ -40,6 +40,7 @@
 #include <siri/help/help.h>
 #include <siri/net/bserver.h>
 #include <siri/net/clserver.h>
+#include <siri/net/pipe.h>
 #include <siri/net/socket.h>
 #include <siri/parser/listener.h>
 #include <siri/siri.h>
@@ -491,8 +492,6 @@ static void SIRI_walk_close_handlers(
 
     switch (handle->type)
     {
-    case UV_WORK:
-        break;
     case UV_SIGNAL:
         /* this is where we cleanup the signal handlers */
         uv_close(handle, NULL);
@@ -512,6 +511,20 @@ static void SIRI_walk_close_handlers(
         }
         break;
 
+    case UV_NAMED_PIPE:
+        /* This can be a pipe server with data set to NULL or a SiriDB pipe
+         * which should be destroyed.
+         */
+        if (handle->data == NULL)
+        {
+            uv_close(handle, NULL);
+        }
+        else
+        {
+            sirinet_pipe_decref(handle);
+        }
+        break;
+
     case UV_TIMER:
         /* we do not expect any timer object since they should all be closed
          * (or at least closing) at this point.